// This file is made available under Elastic License 2.0.
// This file is based on code available under the Apache license here:
//   https://github.com/apache/orc/tree/main/c++/src/Reader.cc

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "Reader.hh"

#include <algorithm>
#include <iostream>
#include <iterator>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <vector>

#include "Adaptor.hh"
#include "BloomFilter.hh"
#include "Options.hh"
#include "Statistics.hh"
#include "StripeStream.hh"
#include "wrap/coded-stream-wrapper.h"

namespace orc {

const WriterVersionImpl& WriterVersionImpl::VERSION_HIVE_8732() {
    static const WriterVersionImpl version(WriterVersion_HIVE_8732);
    return version;
}

uint64_t getCompressionBlockSize(const proto::PostScript& ps) {
    if (ps.has_compressionblocksize()) {
        return ps.compressionblocksize();
    } else {
        return 256 * 1024;
    }
}

CompressionKind convertCompressionKind(const proto::PostScript& ps) {
    if (ps.has_compression()) {
        return static_cast<CompressionKind>(ps.compression());
    } else {
        throw ParseError("Unknown compression type");
    }
}

std::string ColumnSelector::toDotColumnPath() {
    if (columns.empty()) {
        return std::string();
    }
    std::ostringstream columnStream;
    std::copy(columns.begin(), columns.end(), std::ostream_iterator<std::string>(columnStream, "."));
    std::string columnPath = columnStream.str();
    return columnPath.substr(0, columnPath.length() - 1);
}

WriterVersion getWriterVersionImpl(const FileContents* contents) {
    if (!contents->postscript->has_writerversion()) {
        return WriterVersion_ORIGINAL;
    }
    return static_cast<WriterVersion>(contents->postscript->writerversion());
}

void ColumnSelector::selectChildren(std::vector<bool>& selectedColumns, const Type& type) {
    size_t id = static_cast<size_t>(type.getColumnId());
    if (!selectedColumns[id]) {
        selectedColumns[id] = true;
        for (size_t c = id; c <= type.getMaximumColumnId(); ++c) {
            selectedColumns[c] = true;
        }
    }
}

/**
   * Recurse over a type tree and selects the parents of every selected type.
   * @return true if any child was selected.
   */
bool ColumnSelector::selectParents(std::vector<bool>& selectedColumns, const Type& type) {
    size_t id = static_cast<size_t>(type.getColumnId());
    bool result = selectedColumns[id];
    for (uint64_t c = 0; c < type.getSubtypeCount(); ++c) {
        result |= selectParents(selectedColumns, *type.getSubtype(c));
    }
    selectedColumns[id] = result;
    return result;
}

/**
   * Recurse over a type tree and build two maps
   * map<TypeName, TypeId>, map<TypeId, Type>
   */
void ColumnSelector::buildTypeNameIdMap(const Type* type) {
    // map<type_id, Type*>
    idTypeMap[type->getColumnId()] = type;

    if (STRUCT == type->getKind()) {
        for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
            const std::string& fieldName = type->getFieldName(i);
            columns.push_back(fieldName);
            nameIdMap[toDotColumnPath()] = type->getSubtype(i)->getColumnId();
            buildTypeNameIdMap(type->getSubtype(i));
            columns.pop_back();
        }
    } else {
        // other non-primitive type
        for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
            buildTypeNameIdMap(type->getSubtype(j));
        }
    }
}

void ColumnSelector::updateSelected(std::vector<bool>& selectedColumns, const RowReaderOptions& options) {
    selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
    if (contents->schema->getKind() == STRUCT && options.getIndexesSet()) {
        for (unsigned long field : options.getInclude()) {
            updateSelectedByFieldId(selectedColumns, field);
        }
    } else if (contents->schema->getKind() == STRUCT && options.getNamesSet()) {
        for (const auto& field : options.getIncludeNames()) {
            updateSelectedByName(selectedColumns, field);
        }
    } else if (options.getTypeIdsSet()) {
        for (unsigned long typeId : options.getInclude()) {
            updateSelectedByTypeId(selectedColumns, typeId);
        }
    } else {
        // default is to select all columns
        std::fill(selectedColumns.begin(), selectedColumns.end(), true);
    }
    selectParents(selectedColumns, *contents->schema);
    selectedColumns[0] = true; // column 0 is selected by default
}

void ColumnSelector::updateSelectedByFieldId(std::vector<bool>& selectedColumns, uint64_t fieldId) {
    if (fieldId < contents->schema->getSubtypeCount()) {
        selectChildren(selectedColumns, *contents->schema->getSubtype(fieldId));
    } else {
        std::stringstream buffer;
        buffer << "Invalid column selected " << fieldId << " out of " << contents->schema->getSubtypeCount();
        throw ParseError(buffer.str());
    }
}

void ColumnSelector::updateSelectedByTypeId(std::vector<bool>& selectedColumns, uint64_t typeId) {
    if (typeId < selectedColumns.size()) {
        const Type& type = *idTypeMap[typeId];
        selectChildren(selectedColumns, type);
    } else {
        std::stringstream buffer;
        buffer << "Invalid type id selected " << typeId << " out of " << selectedColumns.size();
        throw ParseError(buffer.str());
    }
}

void ColumnSelector::updateSelectedByName(std::vector<bool>& selectedColumns, const std::string& fieldName) {
    std::map<std::string, uint64_t>::const_iterator ite = nameIdMap.find(fieldName);
    if (ite != nameIdMap.end()) {
        updateSelectedByTypeId(selectedColumns, ite->second);
    } else {
        throw ParseError("Invalid column selected " + fieldName);
    }
}

ColumnSelector::ColumnSelector(const FileContents* _contents) : contents(_contents) {
    buildTypeNameIdMap(contents->schema.get());
}

RowReaderImpl::RowReaderImpl(const std::shared_ptr<FileContents>& _contents, const RowReaderOptions& opts)
        : localTimezone(getLocalTimezone()),
          contents(_contents),
          throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()),
          forcedScaleOnHive11Decimal(opts.getForcedScaleOnHive11Decimal()),
          footer(contents->footer.get()),
          firstRowOfStripe(*contents->pool, 0),
          enableEncodedBlock(opts.getEnableLazyDecoding()),
          readerTimezone(getTimezoneByName(opts.getTimezoneName())),
          useWriterTimezone(opts.getUseWriterTimezone()),
          sharedBuffer(*contents->pool, 0) {
    uint64_t numberOfStripes;
    numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
    currentStripe = numberOfStripes;
    lastStripe = 0;
    currentRowInStripe = 0;
    rowsInCurrentStripe = 0;
    uint64_t rowTotal = 0;

    firstRowOfStripe.resize(numberOfStripes);
    for (size_t i = 0; i < numberOfStripes; ++i) {
        firstRowOfStripe[i] = rowTotal;
        proto::StripeInformation stripeInfo = footer->stripes(static_cast<int>(i));
        rowTotal += stripeInfo.numberofrows();
        bool isStripeInRange =
                stripeInfo.offset() >= opts.getOffset() && stripeInfo.offset() < opts.getOffset() + opts.getLength();
        if (isStripeInRange) {
            if (i < currentStripe) {
                currentStripe = i;
            }
            if (i >= lastStripe) {
                lastStripe = i + 1;
            }
        }
    }
    firstStripe = currentStripe;

    if (currentStripe == 0) {
        previousRow = (std::numeric_limits<uint64_t>::max)();
    } else if (currentStripe == numberOfStripes) {
        previousRow = footer->numberofrows();
    } else {
        previousRow = firstRowOfStripe[firstStripe] - 1;
    }

    ColumnSelector column_selector(contents.get());
    column_selector.updateSelected(selectedColumns, opts);

    // prepare SargsApplier if SearchArgument is available
    if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
        sargs = opts.getSearchArgument();
        sargsApplier.reset(new SargsApplier(*contents->schema, sargs.get(), opts.getRowReaderFilter().get(),
                                            footer->rowindexstride(), getWriterVersionImpl(_contents.get())));
    }
}

CompressionKind RowReaderImpl::getCompression() const {
    return contents->compression;
}

uint64_t RowReaderImpl::getCompressionSize() const {
    return contents->blockSize;
}

const std::vector<bool> RowReaderImpl::getSelectedColumns() const {
    return selectedColumns;
}

const Type& RowReaderImpl::getSelectedType() const {
    if (selectedSchema == nullptr) {
        selectedSchema = buildSelectedType(contents->schema.get(), selectedColumns);
    }
    return *(selectedSchema.get());
}

uint64_t RowReaderImpl::getRowNumber() const {
    return previousRow;
}

void RowReaderImpl::seekToRow(uint64_t rowNumber) {
    // Empty file
    if (lastStripe == 0) {
        return;
    }

    // If we are reading only a portion of the file
    // (bounded by firstStripe and lastStripe),
    // seeking before or after the portion of interest should return no data.
    // Implement this by setting previousRow to the number of rows in the file.

    // seeking past lastStripe
    uint64_t num_stripes = static_cast<uint64_t>(footer->stripes_size());
    if ((lastStripe == num_stripes && rowNumber >= footer->numberofrows()) ||
        (lastStripe < num_stripes && rowNumber >= firstRowOfStripe[lastStripe])) {
        currentStripe = num_stripes;
        previousRow = footer->numberofrows();
        return;
    }

    uint64_t seekToStripe = 0;
    while (seekToStripe + 1 < lastStripe && firstRowOfStripe[seekToStripe + 1] <= rowNumber) {
        seekToStripe++;
    }

    // seeking before the first stripe
    if (seekToStripe < firstStripe) {
        currentStripe = num_stripes;
        previousRow = footer->numberofrows();
        return;
    }

    currentStripe = seekToStripe;
    currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
    previousRow = rowNumber;
    startNextStripe();

    // when predicate push down is enabled, above call to startNextStripe()
    // will move current row to 1st matching row group; here we only need
    // to deal with the case when PPD is not enabled.
    if (!sargsApplier) {
        uint64_t rowsToSkip = currentRowInStripe;

        if (footer->rowindexstride() > 0 && currentStripeInfo.indexlength() > 0) {
            if (rowIndexes.empty()) {
                loadStripeIndex();
            }
            uint32_t rowGroupId = static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride());
            rowsToSkip -= static_cast<uint64_t>(rowGroupId) * footer->rowindexstride();

            if (rowGroupId != 0) {
                seekToRowGroup(rowGroupId);
            }
        }

        reader->skip(rowsToSkip);
    }
}

void RowReaderImpl::loadStripeIndex() {
    // reset all previous row indexes
    rowIndexes.clear();
    bloomFilterIndex.clear();

    // obtain row indexes for selected columns
    uint64_t offset = currentStripeInfo.offset();
    for (int i = 0; i < currentStripeFooter.streams_size(); ++i) {
        const proto::Stream& pbStream = currentStripeFooter.streams(i);
        uint64_t colId = pbStream.column();
        if (selectedColumns[colId] && pbStream.has_kind() &&
            (pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
             pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
            std::unique_ptr<SeekableInputStream> inStream =
                    createDecompressor(getCompression(),
                                       std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
                                               contents->stream.get(), offset, pbStream.length(), *contents->pool)),
                                       getCompressionSize(), *contents->pool);

            if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
                proto::RowIndex rowIndex;
                if (!rowIndex.ParseFromZeroCopyStream(inStream.get())) {
                    throw ParseError("Failed to parse the row index");
                }
                rowIndexes[colId] = rowIndex;
            } else { // Stream_Kind_BLOOM_FILTER_UTF8
                proto::BloomFilterIndex pbBFIndex;
                if (!pbBFIndex.ParseFromZeroCopyStream(inStream.get())) {
                    throw ParseError("Failed to parse bloom filter index");
                }
                BloomFilterIndex bfIndex;
                for (int j = 0; j < pbBFIndex.bloomfilter_size(); j++) {
                    bfIndex.entries.push_back(BloomFilterUTF8Utils::deserialize(
                            pbStream.kind(), currentStripeFooter.columns(static_cast<int>(pbStream.column())),
                            pbBFIndex.bloomfilter(j)));
                }
                // add bloom filters to result for one column
                bloomFilterIndex[pbStream.column()] = bfIndex;
            }
        }
        offset += pbStream.length();
    }
}

void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) {
    // store positions for selected columns
    std::vector<std::list<uint64_t>> positions;
    // store position providers for selected colimns
    std::unordered_map<uint64_t, PositionProvider> positionProviders;

    for (const auto& rowIndexe : rowIndexes) {
        uint64_t colId = rowIndexe.first;
        const proto::RowIndexEntry& entry = rowIndexe.second.entry(static_cast<int32_t>(rowGroupEntryId));

        // copy index positions for a specific column
        positions.emplace_back();
        auto& position = positions.back();
        for (int pos = 0; pos != entry.positions_size(); ++pos) {
            position.push_back(entry.positions(pos));
        }
        positionProviders.insert(std::make_pair(colId, PositionProvider(position)));
    }

    reader->seekToRowGroup(positionProviders);
}

const FileContents& RowReaderImpl::getFileContents() const {
    return *contents;
}

bool RowReaderImpl::getThrowOnHive11DecimalOverflow() const {
    return throwOnHive11DecimalOverflow;
}

int32_t RowReaderImpl::getForcedScaleOnHive11Decimal() const {
    return forcedScaleOnHive11Decimal;
}

bool RowReaderImpl::getUseWriterTimezone() const {
    return useWriterTimezone;
}

DataBuffer<char>* RowReaderImpl::getSharedBuffer() const {
    return &sharedBuffer;
}

proto::StripeFooter getStripeFooter(const proto::StripeInformation& info, const FileContents& contents) {
    uint64_t stripeFooterStart = info.offset() + info.indexlength() + info.datalength();
    uint64_t stripeFooterLength = info.footerlength();
    std::unique_ptr<SeekableInputStream> pbStream =
            createDecompressor(contents.compression,
                               std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
                                       contents.stream.get(), stripeFooterStart, stripeFooterLength, *contents.pool)),
                               contents.blockSize, *contents.pool);
    proto::StripeFooter result;
    if (!result.ParseFromZeroCopyStream(pbStream.get())) {
        throw ParseError(std::string("bad StripeFooter from ") + pbStream->getName());
    }
    // Verify StripeFooter in case it's corrupt
    if (result.columns_size() != contents.footer->types_size()) {
        std::stringstream msg;
        msg << "bad number of ColumnEncodings in StripeFooter: expected=" << contents.footer->types_size()
            << ", actual=" << result.columns_size();
        throw ParseError(msg.str());
    }
    return result;
}

ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> _contents, const ReaderOptions& opts, uint64_t _fileLength,
                       uint64_t _postscriptLength)
        : contents(std::move(_contents)),
          options(opts),
          fileLength(_fileLength),
          postscriptLength(_postscriptLength),
          footer(contents->footer.get()) {
    isMetadataLoaded = false;
    checkOrcVersion();
    numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
    contents->schema = REDUNDANT_MOVE(convertType(footer->types(0), *footer));
    contents->blockSize = getCompressionBlockSize(*contents->postscript);
    contents->compression = convertCompressionKind(*contents->postscript);
}

std::string ReaderImpl::getSerializedFileTail() const {
    proto::FileTail tail;
    proto::PostScript* mutable_ps = tail.mutable_postscript();
    mutable_ps->CopyFrom(*contents->postscript);
    proto::Footer* mutableFooter = tail.mutable_footer();
    mutableFooter->CopyFrom(*footer);
    tail.set_filelength(fileLength);
    tail.set_postscriptlength(postscriptLength);
    std::string result;
    if (!tail.SerializeToString(&result)) {
        throw ParseError("Failed to serialize file tail");
    }
    return result;
}

const ReaderOptions& ReaderImpl::getReaderOptions() const {
    return options;
}

CompressionKind ReaderImpl::getCompression() const {
    return contents->compression;
}

uint64_t ReaderImpl::getCompressionSize() const {
    return contents->blockSize;
}

uint64_t ReaderImpl::getNumberOfStripes() const {
    return numberOfStripes;
}

uint64_t ReaderImpl::getNumberOfStripeStatistics() const {
    if (!isMetadataLoaded) {
        readMetadata();
    }
    return metadata == nullptr ? 0 : static_cast<uint64_t>(metadata->stripestats_size());
}

std::unique_ptr<StripeInformation> ReaderImpl::getStripe(uint64_t stripeIndex) const {
    if (stripeIndex > getNumberOfStripes()) {
        throw std::logic_error("stripe index out of range");
    }
    proto::StripeInformation stripeInfo = footer->stripes(static_cast<int>(stripeIndex));

    return std::unique_ptr<StripeInformation>(
            new StripeInformationImpl(stripeInfo.offset(), stripeInfo.indexlength(), stripeInfo.datalength(),
                                      stripeInfo.footerlength(), stripeInfo.numberofrows(), contents->stream.get(),
                                      *contents->pool, contents->compression, contents->blockSize));
}

FileVersion ReaderImpl::getFormatVersion() const {
    if (contents->postscript->version_size() != 2) {
        return FileVersion::v_0_11();
    }
    return {contents->postscript->version(0), contents->postscript->version(1)};
}

uint64_t ReaderImpl::getNumberOfRows() const {
    return footer->numberofrows();
}

WriterId ReaderImpl::getWriterId() const {
    if (footer->has_writer()) {
        uint32_t id = footer->writer();
        if (id > WriterId::TRINO_WRITER) {
            return WriterId::UNKNOWN_WRITER;
        } else {
            return static_cast<WriterId>(id);
        }
    }
    return WriterId::ORC_JAVA_WRITER;
}

uint32_t ReaderImpl::getWriterIdValue() const {
    if (footer->has_writer()) {
        return footer->writer();
    } else {
        return WriterId::ORC_JAVA_WRITER;
    }
}

WriterVersion ReaderImpl::getWriterVersion() const {
    return getWriterVersionImpl(contents.get());
}

uint64_t ReaderImpl::getContentLength() const {
    return footer->contentlength();
}

uint64_t ReaderImpl::getStripeStatisticsLength() const {
    return contents->postscript->metadatalength();
}

uint64_t ReaderImpl::getFileFooterLength() const {
    return contents->postscript->footerlength();
}

uint64_t ReaderImpl::getFilePostscriptLength() const {
    return postscriptLength;
}

uint64_t ReaderImpl::getFileLength() const {
    return fileLength;
}

uint64_t ReaderImpl::getRowIndexStride() const {
    return footer->rowindexstride();
}

const std::string& ReaderImpl::getStreamName() const {
    return contents->stream->getName();
}

std::list<std::string> ReaderImpl::getMetadataKeys() const {
    std::list<std::string> result;
    for (int i = 0; i < footer->metadata_size(); ++i) {
        result.push_back(footer->metadata(i).name());
    }
    return result;
}

std::string ReaderImpl::getMetadataValue(const std::string& key) const {
    for (int i = 0; i < footer->metadata_size(); ++i) {
        if (footer->metadata(i).name() == key) {
            return footer->metadata(i).value();
        }
    }
    throw std::range_error("key not found");
}

void ReaderImpl::getRowIndexStatistics(const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
                                       const proto::StripeFooter& currentStripeFooter,
                                       std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const {
    int num_streams = currentStripeFooter.streams_size();
    uint64_t offset = stripeInfo.offset();
    uint64_t indexEnd = stripeInfo.offset() + stripeInfo.indexlength();
    for (int i = 0; i < num_streams; i++) {
        const proto::Stream& stream = currentStripeFooter.streams(i);
        StreamKind streamKind = static_cast<StreamKind>(stream.kind());
        uint64_t length = static_cast<uint64_t>(stream.length());
        if (streamKind == StreamKind::StreamKind_ROW_INDEX) {
            if (offset + length > indexEnd) {
                std::stringstream msg;
                msg << "Malformed RowIndex stream meta in stripe " << stripeIndex << ": streamOffset=" << offset
                    << ", streamLength=" << length << ", stripeOffset=" << stripeInfo.offset()
                    << ", stripeIndexLength=" << stripeInfo.indexlength();
                throw ParseError(msg.str());
            }
            std::unique_ptr<SeekableInputStream> pbStream =
                    createDecompressor(contents->compression,
                                       std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
                                               contents->stream.get(), offset, length, *contents->pool)),
                                       contents->blockSize, *(contents->pool));

            proto::RowIndex rowIndex;
            if (!rowIndex.ParseFromZeroCopyStream(pbStream.get())) {
                throw ParseError("Failed to parse RowIndex from stripe footer");
            }
            int num_entries = rowIndex.entry_size();
            size_t column = static_cast<size_t>(stream.column());
            for (int j = 0; j < num_entries; j++) {
                const proto::RowIndexEntry& entry = rowIndex.entry(j);
                (*indexStats)[column].push_back(entry.statistics());
            }
        }
        offset += length;
    }
}

bool ReaderImpl::hasMetadataValue(const std::string& key) const {
    for (int i = 0; i < footer->metadata_size(); ++i) {
        if (footer->metadata(i).name() == key) {
            return true;
        }
    }
    return false;
}

const Type& ReaderImpl::getType() const {
    return *(contents->schema.get());
}

std::unique_ptr<StripeStatistics> ReaderImpl::getStripeStatistics(uint64_t stripeIndex) const {
    if (!isMetadataLoaded) {
        readMetadata();
    }
    if (metadata == nullptr) {
        throw std::logic_error("No stripe statistics in file");
    }
    size_t num_cols = static_cast<size_t>(metadata->stripestats(static_cast<int>(stripeIndex)).colstats_size());
    std::vector<std::vector<proto::ColumnStatistics>> indexStats(num_cols);

    proto::StripeInformation currentStripeInfo = footer->stripes(static_cast<int>(stripeIndex));
    proto::StripeFooter currentStripeFooter = getStripeFooter(currentStripeInfo, *contents);

    getRowIndexStatistics(currentStripeInfo, stripeIndex, currentStripeFooter, &indexStats);

    const Timezone& writerTZ = currentStripeFooter.has_writertimezone()
                                       ? getTimezoneByName(currentStripeFooter.writertimezone())
                                       : getLocalTimezone();
    StatContext statContext(hasCorrectStatistics(), &writerTZ);
    return std::unique_ptr<StripeStatistics>(
            new StripeStatisticsImpl(metadata->stripestats(static_cast<int>(stripeIndex)), indexStats, statContext));
}

std::unique_ptr<Statistics> ReaderImpl::getStatistics() const {
    StatContext statContext(hasCorrectStatistics());
    return std::unique_ptr<Statistics>(new StatisticsImpl(*footer, statContext));
}

std::unique_ptr<ColumnStatistics> ReaderImpl::getColumnStatistics(uint32_t index) const {
    if (index >= static_cast<uint64_t>(footer->statistics_size())) {
        throw std::logic_error("column index out of range");
    }
    proto::ColumnStatistics col = footer->statistics(static_cast<int32_t>(index));

    StatContext statContext(hasCorrectStatistics());
    return std::unique_ptr<ColumnStatistics>(convertColumnStatistics(col, statContext));
}

void ReaderImpl::readMetadata() const {
    uint64_t metadataSize = contents->postscript->metadatalength();
    uint64_t footerLength = contents->postscript->footerlength();
    if (fileLength < metadataSize + footerLength + postscriptLength + 1) {
        std::stringstream msg;
        msg << "Invalid Metadata length: fileLength=" << fileLength << ", metadataLength=" << metadataSize
            << ", footerLength=" << footerLength << ", postscriptLength=" << postscriptLength;
        throw ParseError(msg.str());
    }
    uint64_t metadataStart = fileLength - metadataSize - footerLength - postscriptLength - 1;
    if (metadataSize != 0) {
        std::unique_ptr<SeekableInputStream> pbStream =
                createDecompressor(contents->compression,
                                   std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
                                           contents->stream.get(), metadataStart, metadataSize, *contents->pool)),
                                   contents->blockSize, *contents->pool);
        metadata.reset(new proto::Metadata());
        if (!metadata->ParseFromZeroCopyStream(pbStream.get())) {
            throw ParseError("Failed to parse the metadata");
        }
    }
    isMetadataLoaded = true;
}

bool ReaderImpl::hasCorrectStatistics() const {
    return !WriterVersionImpl::VERSION_HIVE_8732().compareGT(getWriterVersion());
}

void ReaderImpl::checkOrcVersion() {
    FileVersion version = getFormatVersion();
    if (version != FileVersion(0, 11) && version != FileVersion(0, 12)) {
        *(options.getErrorStream()) << "Warning: ORC file " << contents->stream->getName()
                                    << " was written in an unknown format version " << version.toString() << "\n";
    }
}

std::unique_ptr<RowReader> ReaderImpl::createRowReader() const {
    RowReaderOptions defaultOpts;
    return createRowReader(defaultOpts);
}

std::unique_ptr<RowReader> ReaderImpl::createRowReader(const RowReaderOptions& opts) const {
    return std::unique_ptr<RowReader>(new RowReaderImpl(contents, opts));
}

uint64_t maxStreamsForType(const proto::Type& type) {
    switch (static_cast<int64_t>(type.kind())) {
    case proto::Type_Kind_STRUCT:
        return 1;
    case proto::Type_Kind_INT:
    case proto::Type_Kind_LONG:
    case proto::Type_Kind_SHORT:
    case proto::Type_Kind_FLOAT:
    case proto::Type_Kind_DOUBLE:
    case proto::Type_Kind_BOOLEAN:
    case proto::Type_Kind_BYTE:
    case proto::Type_Kind_DATE:
    case proto::Type_Kind_LIST:
    case proto::Type_Kind_MAP:
    case proto::Type_Kind_UNION:
        return 2;
    case proto::Type_Kind_BINARY:
    case proto::Type_Kind_DECIMAL:
    case proto::Type_Kind_TIMESTAMP:
    case proto::Type_Kind_TIMESTAMP_INSTANT:
        return 3;
    case proto::Type_Kind_CHAR:
    case proto::Type_Kind_STRING:
    case proto::Type_Kind_VARCHAR:
        return 4;
    default:
        return 0;
    }
}

uint64_t ReaderImpl::getMemoryUse(int stripeIx) {
    std::vector<bool> selectedColumns;
    selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), true);
    return getMemoryUse(stripeIx, selectedColumns);
}

uint64_t ReaderImpl::getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx) {
    std::vector<bool> selectedColumns;
    selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
    ColumnSelector column_selector(contents.get());
    if (contents->schema->getKind() == STRUCT && include.begin() != include.end()) {
        for (unsigned long field : include) {
            column_selector.updateSelectedByFieldId(selectedColumns, field);
        }
    } else {
        // default is to select all columns
        std::fill(selectedColumns.begin(), selectedColumns.end(), true);
    }
    column_selector.selectParents(selectedColumns, *contents->schema);
    selectedColumns[0] = true; // column 0 is selected by default
    return getMemoryUse(stripeIx, selectedColumns);
}

uint64_t ReaderImpl::getMemoryUseByName(const std::list<std::string>& names, int stripeIx) {
    std::vector<bool> selectedColumns;
    selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
    ColumnSelector column_selector(contents.get());
    if (contents->schema->getKind() == STRUCT && names.begin() != names.end()) {
        for (const auto& name : names) {
            column_selector.updateSelectedByName(selectedColumns, name);
        }
    } else {
        // default is to select all columns
        std::fill(selectedColumns.begin(), selectedColumns.end(), true);
    }
    column_selector.selectParents(selectedColumns, *contents->schema);
    selectedColumns[0] = true; // column 0 is selected by default
    return getMemoryUse(stripeIx, selectedColumns);
}

uint64_t ReaderImpl::getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx) {
    std::vector<bool> selectedColumns;
    selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
    ColumnSelector column_selector(contents.get());
    if (include.begin() != include.end()) {
        for (unsigned long field : include) {
            column_selector.updateSelectedByTypeId(selectedColumns, field);
        }
    } else {
        // default is to select all columns
        std::fill(selectedColumns.begin(), selectedColumns.end(), true);
    }
    column_selector.selectParents(selectedColumns, *contents->schema);
    selectedColumns[0] = true; // column 0 is selected by default
    return getMemoryUse(stripeIx, selectedColumns);
}

uint64_t ReaderImpl::getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns) {
    uint64_t maxDataLength = 0;

    if (stripeIx >= 0 && stripeIx < footer->stripes_size()) {
        uint64_t stripe = footer->stripes(stripeIx).datalength();
        if (maxDataLength < stripe) {
            maxDataLength = stripe;
        }
    } else {
        for (int i = 0; i < footer->stripes_size(); i++) {
            uint64_t stripe = footer->stripes(i).datalength();
            if (maxDataLength < stripe) {
                maxDataLength = stripe;
            }
        }
    }

    bool hasStringColumn = false;
    uint64_t nSelectedStreams = 0;
    for (int i = 0; !hasStringColumn && i < footer->types_size(); i++) {
        if (selectedColumns[static_cast<size_t>(i)]) {
            const proto::Type& type = footer->types(i);
            nSelectedStreams += maxStreamsForType(type);
            switch (static_cast<int64_t>(type.kind())) {
            case proto::Type_Kind_CHAR:
            case proto::Type_Kind_STRING:
            case proto::Type_Kind_VARCHAR:
            case proto::Type_Kind_BINARY: {
                hasStringColumn = true;
                break;
            }
            default: {
                break;
            }
            }
        }
    }

    /* If a string column is read, use stripe datalength as a memory estimate
     * because we don't know the dictionary size. Multiply by 2 because
     * a string column requires two buffers:
     * in the input stream and in the seekable input stream.
     * If no string column is read, estimate from the number of streams.
     */
    uint64_t memory = hasStringColumn ? 2 * maxDataLength
                                      : std::min(uint64_t(maxDataLength),
                                                 nSelectedStreams * contents->stream->getNaturalReadSize());

    // Do we need even more memory to read the footer or the metadata?
    if (memory < contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS) {
        memory = contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS;
    }
    if (memory < contents->postscript->metadatalength()) {
        memory = contents->postscript->metadatalength();
    }

    // Account for firstRowOfStripe.
    memory += static_cast<uint64_t>(footer->stripes_size()) * sizeof(uint64_t);

    // Decompressors need buffers for each stream
    uint64_t decompressorMemory = 0;
    if (contents->compression != CompressionKind_NONE) {
        for (int i = 0; i < footer->types_size(); i++) {
            if (selectedColumns[static_cast<size_t>(i)]) {
                const proto::Type& type = footer->types(i);
                decompressorMemory += maxStreamsForType(type) * contents->blockSize;
            }
        }
        if (contents->compression == CompressionKind_SNAPPY) {
            decompressorMemory *= 2; // Snappy decompressor uses a second buffer
        }
    }

    return memory + decompressorMemory;
}

void RowReaderImpl::startNextStripe() {
    reader.reset(); // ColumnReaders use lots of memory; free old memory first
    rowIndexes.clear();
    bloomFilterIndex.clear();

    while (currentStripe < lastStripe) {
        currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
        uint64_t fileLength = contents->stream->getLength();
        if (currentStripeInfo.offset() + currentStripeInfo.indexlength() + currentStripeInfo.datalength() +
                    currentStripeInfo.footerlength() >=
            fileLength) {
            std::stringstream msg;
            msg << "Malformed StripeInformation at stripe index " << currentStripe << ": fileLength=" << fileLength
                << ", StripeInfo=(offset=" << currentStripeInfo.offset()
                << ", indexLength=" << currentStripeInfo.indexlength()
                << ", dataLength=" << currentStripeInfo.datalength()
                << ", footerLength=" << currentStripeInfo.footerlength() << ")";
            throw ParseError(msg.str());
        }

        bool skipStripe = false;
        if (sargsApplier && sargsApplier->getRowReaderFilter()) {
            if (sargsApplier->getRowReaderFilter()->filterOnOpeningStripe(currentStripe, &currentStripeInfo)) {
                skipStripe = true;
                goto end;
            }
        }

        currentStripeFooter = getStripeFooter(currentStripeInfo, *contents);
        rowsInCurrentStripe = currentStripeInfo.numberofrows();

        if (sargsApplier) {
            // read row group statistics and bloom filters of current stripe
            loadStripeIndex();

            // select row groups to read in the current stripe
            sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes, bloomFilterIndex);
            if (!sargsApplier->hasSelectedFrom(currentRowInStripe)) {
                skipStripe = true;
                goto end;
            }
        }
        {
            // get writer timezone info from stripe footer to help understand timestamp values.
            const Timezone& writerTimezone = currentStripeFooter.has_writertimezone()
                                                     ? getTimezoneByName(currentStripeFooter.writertimezone())
                                                     : getLocalTimezone();
            StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo, currentStripeFooter,
                                            currentStripeInfo.offset(), *contents->stream, writerTimezone,
                                            readerTimezone);
            reader = buildReader(*contents->schema, stripeStreams);

            if (sargsApplier) {
                if (sargsApplier->getRowReaderFilter()) {
                    std::unordered_map<uint64_t, StringDictionary*> sdicts;
                    collectStringDictionary(reader.get(), sdicts);
                    if (sargsApplier->getRowReaderFilter()->filterOnPickStringDictionary(sdicts)) {
                        skipStripe = true;
                        reader.reset();
                        goto end;
                    }
                }

                // move to the 1st selected row group when PPD is enabled.
                currentRowInStripe = advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe,
                                                           footer->rowindexstride(), sargsApplier->getRowGroups());
                previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe - 1;
                if (currentRowInStripe > 0) {
                    seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()));
                }
            }
        }

    end:
        if (skipStripe) {
            // advance to next stripe when current stripe has no matching rows
            currentStripe += 1;
            currentRowInStripe = 0;
        } else {
            break;
        }
    }
}

bool RowReaderImpl::next(ColumnVectorBatch& data) {
    if (currentStripe >= lastStripe) {
        data.numElements = 0;
        if (lastStripe > 0) {
            previousRow =
                    firstRowOfStripe[lastStripe - 1] + footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows();
        } else {
            previousRow = 0;
        }
        return false;
    }
    if (currentRowInStripe == 0) {
        startNextStripe();
    }
    uint64_t rowsToRead = std::min(static_cast<uint64_t>(data.capacity), rowsInCurrentStripe - currentRowInStripe);
    if (currentStripe >= lastStripe) {
        rowsToRead = 0;
    } else if (sargsApplier) {
        rowsToRead = computeBatchSize(rowsToRead, currentRowInStripe, rowsInCurrentStripe, footer->rowindexstride(),
                                      sargsApplier->getRowGroups());
    }
    data.numElements = rowsToRead;
    if (rowsToRead == 0) {
        previousRow = lastStripe <= 0 ? footer->numberofrows()
                                      : firstRowOfStripe[lastStripe - 1] +
                                                footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows();
        return false;
    }
    if (enableEncodedBlock) {
        reader->nextEncoded(data, rowsToRead, nullptr);
    } else {
        reader->next(data, rowsToRead, nullptr);
    }
    // update row number
    previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
    currentRowInStripe += rowsToRead;

    // check if we need to advance to next selected row group
    if (sargsApplier) {
        uint64_t nextRowToRead = advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe,
                                                       footer->rowindexstride(), sargsApplier->getRowGroups());
        if (currentRowInStripe != nextRowToRead) {
            // it is guaranteed to be at start of a row group
            currentRowInStripe = nextRowToRead;
            if (currentRowInStripe < rowsInCurrentStripe) {
                seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()));
            }
        }
    }

    if (currentRowInStripe >= rowsInCurrentStripe) {
        currentStripe += 1;
        currentRowInStripe = 0;
    }
    return rowsToRead != 0;
}

uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize, uint64_t currentRowInStripe,
                                         uint64_t rowsInCurrentStripe, uint64_t rowIndexStride,
                                         const std::vector<bool>& includedRowGroups) {
    // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
    // groups are selected then marker position is set to the end of range (subset of row groups
    // within stripe).
    uint64_t endRowInStripe = rowsInCurrentStripe;
    if (!includedRowGroups.empty()) {
        endRowInStripe = currentRowInStripe;
        uint32_t rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
        for (; rg < includedRowGroups.size(); ++rg) {
            if (!includedRowGroups[rg]) {
                break;
            } else {
                endRowInStripe = std::min(rowsInCurrentStripe, (rg + 1) * rowIndexStride);
            }
        }
    }
    return std::min(requestedSize, endRowInStripe - currentRowInStripe);
}

uint64_t RowReaderImpl::advanceToNextRowGroup(uint64_t currentRowInStripe, uint64_t rowsInCurrentStripe,
                                              uint64_t rowIndexStride, const std::vector<bool>& includedRowGroups) {
    if (!includedRowGroups.empty()) {
        uint32_t rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
        for (; rg < includedRowGroups.size(); ++rg) {
            if (includedRowGroups[rg]) {
                return currentRowInStripe;
            } else {
                // advance to start of next row group
                currentRowInStripe = (rg + 1) * rowIndexStride;
            }
        }
    }
    return std::min(currentRowInStripe, rowsInCurrentStripe);
}

std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch(uint64_t capacity) const {
    return getSelectedType().createRowBatch(capacity, *contents->pool, enableEncodedBlock);
}

void ensureOrcFooter(InputStream* stream, DataBuffer<char>* buffer, uint64_t postscriptLength) {
    const std::string MAGIC("ORC");
    const uint64_t magicLength = MAGIC.length();
    const char* const bufferStart = buffer->data();
    const uint64_t bufferLength = buffer->size();

    if (postscriptLength < magicLength || bufferLength < magicLength) {
        throw ParseError("Invalid ORC postscript length");
    }
    const char* magicStart = bufferStart + bufferLength - 1 - magicLength;

    // Look for the magic string at the end of the postscript.
    if (memcmp(magicStart, MAGIC.c_str(), magicLength) != 0) {
        // If there is no magic string at the end, check the beginning.
        // Only files written by Hive 0.11.0 don't have the tail ORC string.
        std::unique_ptr<char[]> frontBuffer(new char[magicLength]);
        stream->read(frontBuffer.get(), magicLength, 0);
        bool foundMatch = memcmp(frontBuffer.get(), MAGIC.c_str(), magicLength) == 0;

        if (!foundMatch) {
            throw ParseError("Not an ORC file");
        }
    }
}

/**
   * Read the file's postscript from the given buffer.
   * @param stream the file stream
   * @param buffer the buffer with the tail of the file.
   * @param postscriptSize the length of postscript in bytes
   */
std::unique_ptr<proto::PostScript> readPostscript(InputStream* stream, DataBuffer<char>* buffer,
                                                  uint64_t postscriptSize) {
    char* ptr = buffer->data();
    uint64_t readSize = buffer->size();

    ensureOrcFooter(stream, buffer, postscriptSize);

    std::unique_ptr<proto::PostScript> postscript = std::unique_ptr<proto::PostScript>(new proto::PostScript());
    if (readSize < 1 + postscriptSize) {
        std::stringstream msg;
        msg << "Invalid ORC postscript length: " << postscriptSize << ", file length = " << stream->getLength();
        throw ParseError(msg.str());
    }
    if (!postscript->ParseFromArray(ptr + readSize - 1 - postscriptSize, static_cast<int>(postscriptSize))) {
        throw ParseError("Failed to parse the postscript from " + stream->getName());
    }
    return REDUNDANT_MOVE(postscript);
}

/**
   * Check that proto Types are valid. Indices in the type tree should be valid,
   * so we won't crash when we convert the proto::Types to TypeImpls (ORC-317).
   * For STRUCT types, fieldName size should match subTypes size (ORC-581).
   */
void checkProtoTypes(const proto::Footer& footer) {
    std::stringstream msg;
    int maxId = footer.types_size();
    if (maxId <= 0) {
        throw ParseError("Footer is corrupt: no types found");
    }
    for (int i = 0; i < maxId; ++i) {
        const proto::Type& type = footer.types(i);
        if (type.kind() == proto::Type_Kind_STRUCT && type.subtypes_size() != type.fieldnames_size()) {
            msg << "Footer is corrupt: STRUCT type " << i << " has " << type.subtypes_size() << " subTypes, but has "
                << type.fieldnames_size() << " fieldNames";
            throw ParseError(msg.str());
        }
        for (int j = 0; j < type.subtypes_size(); ++j) {
            int subTypeId = static_cast<int>(type.subtypes(j));
            if (subTypeId <= i) {
                msg << "Footer is corrupt: malformed link from type " << i << " to " << subTypeId;
                throw ParseError(msg.str());
            }
            if (subTypeId >= maxId) {
                msg << "Footer is corrupt: types(" << subTypeId << ") not exists";
                throw ParseError(msg.str());
            }
            if (j > 0 && static_cast<int>(type.subtypes(j - 1)) >= subTypeId) {
                msg << "Footer is corrupt: subType(" << (j - 1) << ") >= subType(" << j << ") in types(" << i << "). ("
                    << type.subtypes(j - 1) << " >= " << subTypeId << ")";
                throw ParseError(msg.str());
            }
        }
    }
}

/**
   * Parse the footer from the given buffer.
   * @param stream the file's stream
   * @param buffer the buffer to parse the footer from
   * @param footerOffset the offset within the buffer that contains the footer
   * @param ps the file's postscript
   * @param memoryPool the memory pool to use
   */
std::unique_ptr<proto::Footer> readFooter(InputStream* stream, const DataBuffer<char>* buffer, uint64_t footerOffset,
                                          const proto::PostScript& ps, MemoryPool& memoryPool) {
    const char* footerPtr = buffer->data() + footerOffset;

    std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
            convertCompressionKind(ps),
            std::unique_ptr<SeekableInputStream>(new SeekableArrayInputStream(footerPtr, ps.footerlength())),
            getCompressionBlockSize(ps), memoryPool);

    std::unique_ptr<proto::Footer> footer = std::unique_ptr<proto::Footer>(new proto::Footer());
    if (!footer->ParseFromZeroCopyStream(pbStream.get())) {
        throw ParseError("Failed to parse the footer from " + stream->getName());
    }

    checkProtoTypes(*footer);
    return REDUNDANT_MOVE(footer);
}

std::unique_ptr<Reader> createReader(std::unique_ptr<InputStream> stream, const ReaderOptions& options) {
    std::shared_ptr<FileContents> contents = std::make_shared<FileContents>();
    contents->pool = options.getMemoryPool();
    contents->errorStream = options.getErrorStream();
    std::string serializedFooter = options.getSerializedFileTail();
    uint64_t fileLength;
    uint64_t postscriptLength;
    if (serializedFooter.length() != 0) {
        // Parse the file tail from the serialized one.
        proto::FileTail tail;
        if (!tail.ParseFromString(serializedFooter)) {
            throw ParseError("Failed to parse the file tail from string");
        }
        contents->postscript.reset(new proto::PostScript(tail.postscript()));
        contents->footer.reset(new proto::Footer(tail.footer()));
        fileLength = tail.filelength();
        postscriptLength = tail.postscriptlength();
    } else {
        // figure out the size of the file using the option or filesystem
        fileLength = std::min(options.getTailLocation(), static_cast<uint64_t>(stream->getLength()));

        // read last bytes into buffer to get PostScript
        uint64_t readSize = std::min(fileLength, DIRECTORY_SIZE_GUESS);
        if (readSize < 4) {
            throw ParseError("File size too small");
        }
        std::unique_ptr<DataBuffer<char>> buffer(new DataBuffer<char>(*contents->pool, readSize));
        stream->read(buffer->data(), readSize, fileLength - readSize);

        postscriptLength = buffer->data()[readSize - 1] & 0xff;
        contents->postscript = REDUNDANT_MOVE(readPostscript(stream.get(), buffer.get(), postscriptLength));
        uint64_t footerSize = contents->postscript->footerlength();
        uint64_t tailSize = 1 + postscriptLength + footerSize;
        if (tailSize >= fileLength) {
            std::stringstream msg;
            msg << "Invalid ORC tailSize=" << tailSize << ", fileLength=" << fileLength;
            throw ParseError(msg.str());
        }
        uint64_t footerOffset;

        if (tailSize > readSize) {
            buffer->resize(footerSize);
            stream->read(buffer->data(), footerSize, fileLength - tailSize);
            footerOffset = 0;
        } else {
            footerOffset = readSize - tailSize;
        }

        contents->footer = REDUNDANT_MOVE(
                readFooter(stream.get(), buffer.get(), footerOffset, *contents->postscript, *contents->pool));
    }
    contents->stream = std::move(stream);
    return std::unique_ptr<Reader>(new ReaderImpl(std::move(contents), options, fileLength, postscriptLength));
}

std::map<uint32_t, BloomFilterIndex> ReaderImpl::getBloomFilters(uint32_t stripeIndex,
                                                                 const std::set<uint32_t>& included) const {
    std::map<uint32_t, BloomFilterIndex> ret;

    // find stripe info
    if (stripeIndex >= static_cast<uint32_t>(footer->stripes_size())) {
        throw std::logic_error("Illegal stripe index: " + to_string(static_cast<int64_t>(stripeIndex)));
    }
    const proto::StripeInformation currentStripeInfo = footer->stripes(static_cast<int>(stripeIndex));
    const proto::StripeFooter currentStripeFooter = getStripeFooter(currentStripeInfo, *contents);

    // iterate stripe footer to get stream of bloomfilter
    uint64_t offset = static_cast<uint64_t>(currentStripeInfo.offset());
    for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
        const proto::Stream& stream = currentStripeFooter.streams(i);
        uint32_t column = static_cast<uint32_t>(stream.column());
        uint64_t length = static_cast<uint64_t>(stream.length());

        // a bloom filter stream from a selected column is found
        if (stream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8 &&
            (included.empty() || included.find(column) != included.end())) {
            std::unique_ptr<SeekableInputStream> pbStream =
                    createDecompressor(contents->compression,
                                       std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
                                               contents->stream.get(), offset, length, *contents->pool)),
                                       contents->blockSize, *(contents->pool));

            proto::BloomFilterIndex pbBFIndex;
            if (!pbBFIndex.ParseFromZeroCopyStream(pbStream.get())) {
                throw ParseError("Failed to parse BloomFilterIndex");
            }

            BloomFilterIndex bfIndex;
            for (int j = 0; j < pbBFIndex.bloomfilter_size(); j++) {
                std::unique_ptr<BloomFilter> entry = BloomFilterUTF8Utils::deserialize(
                        stream.kind(), currentStripeFooter.columns(static_cast<int>(stream.column())),
                        pbBFIndex.bloomfilter(j));
                bfIndex.entries.push_back(std::shared_ptr<BloomFilter>(std::move(entry)));
            }

            // add bloom filters to result for one column
            ret[column] = bfIndex;
        }

        offset += length;
    }

    return ret;
}

RowReader::~RowReader() {
    // PASS
}

Reader::~Reader() {
    // PASS
}

InputStream::~InputStream(){
        // PASS
};

} // namespace orc
